package com.googlecode.javarpctp.communication;

import com.googlecode.javarpctp.remote.WaitingCallMessage;
import com.googlecode.javarpctp.test.common.ThreadListener;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import javax.swing.event.EventListenerList;
import com.googlecode.javarpctp.test.common.Logger;
import com.googlecode.javarpctp.test.common.ThreadCreatedEvent;

public class ConnectionHandler implements Runnable {

    private Socket connection;
    private ObjectOutputStream output;
    private ObjectInputStream input;
    private ConnectionState state;
    protected EventListenerList listenerList = new EventListenerList();
    private ConcurrentHashMap<Integer, WaitingCallMessage> waitingResponsiveMessages = new ConcurrentHashMap<Integer, WaitingCallMessage>();
    private boolean waitingDisconnection;
    private StopCause stopCause;
    private Exception stopException;

    public ConnectionHandler(Socket connection) throws SocketException {
        this.connection = connection;
        this.setState(ConnectionState.CREATED);
        this.init();
    }

    private void init() throws SocketException {
        this.output = null;
        this.input = null;
        this.waitingResponsiveMessages.clear();
        this.waitingDisconnection = false;
        this.stopCause = null;
        this.stopException = null;
        this.configure();
    }

    private void configure() throws SocketException {
        this.connection.setSoTimeout(1500);
    }

    @Override
    public void run() {
        try {
            this.init();
            this.getStreams();
            this.processMessages();
        } catch (Exception ex) {
            this.stop(ex);
        } finally {
            try {
                this.closeStreams();
            } catch (Exception ex) {
                Logger.getInstance().log(Level.FINER, null, ex);
            }
        }

        this.setState(ConnectionState.STOPED);
    }

    public void getStreams() throws IOException {
        this.output = new ObjectOutputStream(this.connection.getOutputStream());
        this.output.flush();
        this.input = new ObjectInputStream(this.connection.getInputStream());
    }

    public void closeStreams() throws IOException {
        this.output.close();
        this.output = null;
        this.input.close();
        this.input = null;
    }

    public void stop() {
        this.setState(ConnectionState.STOPPING);
        this.stopCause = StopCause.REQUESTED;
    }

    public void stop(Exception ex) {
        this.setState(ConnectionState.STOPPING);
        this.stopCause = StopCause.EXCEPTION;
        this.stopException = ex;
    }

    public void processMessages() throws Exception {
        this.setState(ConnectionState.RUNNING);
        Message message;
        while (this.getState() != ConnectionState.STOPPING) {
            try {
                message = (Message) this.input.readObject();

                if (message instanceof ResponseMessage) {
                    ResponseMessage responseMessage = (ResponseMessage) message;
                    if (this.waitingResponsiveMessages.containsKey(responseMessage.getMessageID())) {
                        WaitingCallMessage waitingCallMessage = this.waitingResponsiveMessages.get(responseMessage.getMessageID());
                        waitingCallMessage.getResponsiveMessage().setResponse(responseMessage);
                        synchronized (waitingCallMessage.getWaitingThread()) {
                            waitingCallMessage.getWaitingThread().notify();
                        }
                        this.waitingResponsiveMessages.remove(responseMessage.getMessageID());
                    }
                } else {
                    fireCustomMessageRecieved(new CustomMessageRecievedEvent(ConnectionHandler.this, message));                    
                }
                this.fireMessageTransported(new ConnectionHandlerMessageTransportEvent(this, MessageDirection.RECIEVED, message));
            } catch (SocketTimeoutException ex) {
                // Occurs when timeout exceeds
            } catch (EOFException ex) {
                // Occurs when otherside close the connection
                if (this.isWaitingDisconnection()) {
                    stop();
                } else {
                    stop(ex);
                }
            } catch (SocketException ex) {
                // Occurs when connection is lost
                stop(ex);
            }
        }
    }

    public void sendMessage(Message message) throws IOException {
        this.output.writeObject(message);
        this.output.flush();
        this.fireMessageTransported(new ConnectionHandlerMessageTransportEvent(this, MessageDirection.SENDED, message));
    }
    
    public Object sendMessageAndWaitResponse(ResponsiveMessage message) throws IOException, InterruptedException, Throwable {
        this.waitingResponsiveMessages.put(message.getMessageID(), new WaitingCallMessage(message, Thread.currentThread()));
        this.sendMessage(message);
        synchronized (Thread.currentThread()) {
            Thread.currentThread().wait();
        }

        ResponseMessage responseMessage = message.getResponse();
        if (responseMessage.getThrown() != null) {
            throw responseMessage.getThrown().getCause();
        }

        return responseMessage.getData();
    }    

    public ConnectionState getState() {
        return state;
    }

    private void setState(ConnectionState state) {
        this.state = state;
        this.fireConnectionStateChanged(new ConnectionStateChangedEvent(this, state));
    }

    public StopCause getStopCause() {
        return stopCause;
    }

    public Exception getStopException() {
        return stopException;
    }

    public boolean isWaitingDisconnection() {
        return waitingDisconnection;
    }

    public void setWaitingDisconnection(boolean waitingDisconnection) {
        this.waitingDisconnection = waitingDisconnection;
    }

    // <editor-fold defaultstate="collapsed" desc="Events">
    public void addConnectionStateChangedListener(ConnectionStateChangedListener listener) {
        this.listenerList.add(ConnectionStateChangedListener.class, listener);
    }

    public void removeConnectionStateChangedListener(ConnectionStateChangedListener listener) {
        this.listenerList.remove(ConnectionStateChangedListener.class, listener);
    }

    void fireConnectionStateChanged(ConnectionStateChangedEvent evt) {
        Object[] listeners = this.listenerList.getListenerList();
        // Each listener occupies two elements - the first is the listener class
        // and the second is the listener instance
        for (int i = 0; i < listeners.length; i += 2) {
            if (listeners[i] == ConnectionStateChangedListener.class) {
                ((ConnectionStateChangedListener) listeners[i + 1]).stateChanged(evt);
            }
        }
    }

    public void addCustomMessageRecievedListener(CustomMessageRecievedListener listener) {
        this.listenerList.add(CustomMessageRecievedListener.class, listener);
    }

    public void removeCustomMessageRecievedListener(CustomMessageRecievedListener listener) {
        this.listenerList.remove(CustomMessageRecievedListener.class, listener);
    }

    void fireCustomMessageRecieved(CustomMessageRecievedEvent evt) throws Exception {
        Object[] listeners = this.listenerList.getListenerList();
        // Each listener occupies two elements - the first is the listener class
        // and the second is the listener instance
        for (int i = 0; i < listeners.length; i += 2) {
            if (listeners[i] == CustomMessageRecievedListener.class) {
                ((CustomMessageRecievedListener) listeners[i + 1]).customMessageRecieved(evt);
            }
        }
    }

    public void addThreadListener(ThreadListener listener) {
        this.listenerList.add(ThreadListener.class, listener);
    }

    public void removeThreadListener(ThreadListener listener) {
        this.listenerList.remove(ThreadListener.class, listener);
    }

    void fireThreadCreated(ThreadCreatedEvent evt) {
        Object[] listeners = this.listenerList.getListenerList();
        // Each listener occupies two elements - the first is the listener class
        // and the second is the listener instance
        for (int i = 0; i < listeners.length; i += 2) {
            if (listeners[i] == ThreadListener.class) {
                ((ThreadListener) listeners[i + 1]).threadCreated(evt);
            }
        }
    }

    public void addConnectionHandlerMessageTransportListener(ConnectionHandlerMessageTransportListener listener) {
        this.listenerList.add(ConnectionHandlerMessageTransportListener.class, listener);
    }

    public void removeConnectionHandlerMessageTransportListener(ConnectionHandlerMessageTransportListener listener) {
        this.listenerList.remove(ConnectionHandlerMessageTransportListener.class, listener);
    }

    void fireMessageTransported(ConnectionHandlerMessageTransportEvent evt) {
        Object[] listeners = listenerList.getListenerList();
        // Each listener occupies two elements - the first is the listener class
        // and the second is the listener instance
        for (int i = 0; i < listeners.length; i += 2) {
            if (listeners[i] == ConnectionHandlerMessageTransportListener.class) {
                ((ConnectionHandlerMessageTransportListener) listeners[i + 1]).messageTransported(evt);
            }
        }
    }
    //</editor-fold>
}
